新感覚なELTツール「Meltano」を使ってSlackのデータをDWHに連携してみた
大阪オフィスの玉井です。
今回はMeltanoというツールをご紹介します。
Meltanoとは?
公式から引用すると「DataOps時代におけるEL(T)ツール」だそうです。
…
…私の個人的な感覚で説明しますが、絶妙な位置づけのEL(T)ツールです。やることはELT(メインはEL)なのですが、とにかく位置づけが絶妙なのです。
Meltanoの絶妙なポジション
データをE(抽出)してL(ロード)する…という仕組みを行いたいとき、ざっくり分けると、下記のどちらかを選ぶと思います。
- 手動で開発する(Pythonなど)
- そういうサービスを導入する(Fivetranなど)
前者は何でもできますが、人と時間のコストが半端じゃないです。後者はめちゃくちゃ楽ですが、ちょっとカスタマイズしたいみたいな時に、あまり身動きがとれません(融通が効きづらい)。
Meltanoは上記の中間に位置する感じです。コードやコマンドで設定を行うのですが、データの抽出部分やロード部分はパッケージングされており、あとはそれを組み合わせるだけです、面倒な部分は作らなくてよくて、設定部分はコードベースで自由に管理する、非常に新感覚なET(T)ツールとなっています。
Tは?
さっきから「EL(T)」という謎めいた表現をしていますが、これには理由があります。
Meltanoが単体で保有している機能はE(抽出)とL(ロード)だけです。ではT(変換)はどうするのかというと、dbtと連携して実現します。要するに、Tはdbtに一任するということです。この発想も「ほう!」ってなりました。
Singer
MeltanoはSingerという技術が利用されています。
データ移行を簡単に行うためのオープンソースのツールです。ファイル統合サービスでおなじみのStitchでも、このSingerを使うことが出来ます。
Singer tapと呼ばれる、抽出やロードをパッケージングしたものを組み合わせてデータ移行の仕組みを構築することができます。つまり、上述したMeltanoの絶妙な位置づけは、このSingerによるところが大きいといえるでしょう。
従来のSingerユーザーであれば、めちゃくちゃ簡単にMeltanoを利用することができると思います(Singer知らなくてもかなり簡単に使えます)。
やってみた
下記を参考にします。このチュートリアルはGitlabのデータを使用していますが、私はSlack(社内で使っているやつ)のデータを使いました。
検証環境
- macOS Big Sur 11.5.2
- fish 3.3.1
- Python 3.9.7
- pip 21.2.4
- meltano 1.80.1
やることまとめ
Slackにある私の分報チャネルに関するデータをDWH(BQ)にロードします(集計もちょっとやりたい)。
Meltanoのインストール
Dockerコンテナも用意されていますが、今後のコマンドの実行が面倒そうだったので、ローカルにインストールしました。
ドキュメントにならって(Pythonのバージョンはドキュメントに記載されているものと異なりますが)、専用ディレクトリを用意し、venv
を使用した仮想環境内にmeltanoをインストールすることにします。
> mkdir meltano-projects > cd meltano-projects > python3 -m venv .venv > source .venv/bin/activate.fish
ここまで出来たら下準備は完了です。meltanoをインストールします。
> pip3 install meltano
動作確認します。
> meltano --version meltano, version 1.80.1
Meltano用のプロジェクトを作成する
インストールが完了したら、最初にやることは、Meltano Projectの用意です。下記のコマンドで生成できます。
> meltano init test_pj
こんな感じの構造をしたディレクトリ群が作成されます。Meltano Projectの実態はコードなので、Gitでバージョン管理することもできます。
. ├── README.md ├── analyze ├── extract ├── load ├── meltano.yml ├── model ├── notebook ├── orchestrate ├── requirements.txt └── transform
後述しますが、meltano.yml
がProjectの核となるファイルです。ここにデータ連携の設定が記述されていきます。
extractorの設定を行う
Projectを作ったら、次にやることは、「何のデータを持ってくるか(抽出するか、Extract)」です。Meltanoはextractorというものを設定して、データの抽出元を定義します。
どういうものに対応しているのかは、下記のコマンドで確認できます。
> meltano discover extractors Extractors tap-adwords tap-ask-nicely tap-bigquery tap-bing-ads tap-chargebee tap-csv tap-facebook tap-fastly tap-gitlab tap-github ...
GithubやZendeskといったアプリケーションのデータや、csvファイル等に対応しています。この中から、自分が使用したいextractorを選んでインストールする形となります。自分が求めているものがない場合は自作することも可能だそうです。
extractorのインストール
今回は、冒頭で述べたとおり、Slackのデータを使いたいので、tap-slack
というextractorをインストールします。
> meltano add extractor tap-slack Added extractor 'tap-slack' to your Meltano project Repository: https://github.com/Mashey/tap-slack Documentation: https://hub.meltano.com/extractors/slack.html Installing extractor 'tap-slack'... Installed extractor 'tap-slack' To learn more about extractor 'tap-slack', visit https://hub.meltano.com/extractors/slack.html
インストールしたextractorの使い方はhelpオプションで呼び出すことが出来ます。
> meltano invoke tap-slack --help usage: tap-slack [-h] -c CONFIG [-s STATE] [-p PROPERTIES] [--catalog CATALOG] [-d] optional arguments: -h, --help show this help message and exit -c CONFIG, --config CONFIG Config file -s STATE, --state STATE State file -p PROPERTIES, --properties PROPERTIES Property selections: DEPRECATED, Please use --catalog instead --catalog CATALOG Catalog file -d, --discover Do schema discovery
extractorの設定値のセット
extractorには、それぞれ設定できる項目(環境変数みたいな)のようなものがあり、それをセットしていくことで、設定を行います。中には設定必須な項目もあり、それを設定するまでは、extractorは正しく動作しません。
設定項目については、下記のコマンドで確認することができます。
> meltano config tap-slack list api_token [env: TAP_SLACK_API_TOKEN] current value: None (default) start_date [env: TAP_SLACK_START_DATE] current value: None (default) Sync Start Date: Determines how much historical data will be extracted. Please be aware that the larger the time period and amount of data, the longer the initial extraction can be expected to take. channels [env: TAP_SLACK_CHANNELS] current value: None (default) By default the tap will sync all channels it has been invited to, but this can be overriden to limit it ot specific channels. Note this needs to be channel ID, not the name, as recommended by the Slack API. To get the ID for a channel, either use the Slack API or find it in the URL. private_channels [env: TAP_SLACK_PRIVATE_CHANNELS] current value: True (default) Join Private Channels: Specifies whether to sync private channels or not. Default is true. join_public_channels [env: TAP_SLACK_JOIN_PUBLIC_CHANNELS] current value: False (default) Join Public Channels: Specifies whether to have the tap auto-join all public channels in your ogranziation. Default is false. archived_channels [env: TAP_SLACK_ARCHIVED_CHANNELS] current value: False (default) Sync Archived Channels: Specifies whether the tap will sync archived channels or not. Note that a bot cannot join an archived channel, so unless the bot was added to the channel prior to it being archived it will not be able to sync the data from that channel. Default is false. date_window_size [env: TAP_SLACK_DATE_WINDOW_SIZE] current value: 7 (default) Date Window Size: Specifies the window size for syncing certain streams (messages, files, threads). The default is 7 days. To learn more about extractor 'tap-slack' and its settings, visit https://hub.meltano.com/extractors/slack.html
…実は大体のextractorはドキュメントが用意されていて、こっちを見たほうが早いです…。
tap-slack
に関しては、SlackのAPIを経由してデータを取得するので、SlackのAPIトークンは必須です。項目に値をセットするコマンドは下記の通り。トークン以外にも、抽出対象とするチャネルや(IDを指定する)、抽出するデータの開始時期を指定しました(テスト的な利用なので過去1年分のみを抽出するように設定)。
> meltano config tap-slack set token <トークン> Extractor 'tap-slack' setting 'token' was set in `meltano.yml`: '<トークン>' > meltano config tap-slack set channels '["チャネルID"]' Extractor 'tap-slack' setting 'channels' was set in `meltano.yml`: ['チャネルID'] > meltano config tap-slack set start_date 2020-10-01T00:00:00Z Extractor 'tap-slack' setting 'start_date' was set in `meltano.yml`: '2020-10-01T00:00:00Z'
extractorで取得したいデータの設定
extractor自体の設定を終えたら、最後は「抽出したいデータ」を選びます。下記のコマンドで抽出できるデータの一覧が確認できます。ここから抽出したいデータを選んでいきます。
attributesの1つ1つがカラムになるイメージです。ドットの前の接頭辞(?)にあたる名前がテーブル名になるイメージです(channels.channel_id
→channelsというテーブルにchannel_idというカラムができる)。
> meltano select tap-slack --list --all Legend: selected excluded automatic Enabled patterns: *.* Selected attributes: [automatic] channel_members.channel_id [automatic] channel_members.user_id [selected ] channels.channel_id [selected ] channels.created [selected ] channels.creator [automatic] channels.id [selected ] channels.is_archived [selected ] channels.is_channel [selected ] channels.is_ext_shared [selected ] channels.is_general [selected ] channels.is_group [selected ] channels.is_im [selected ] channels.is_member [selected ] channels.is_mpim [selected ] channels.is_org_shared [selected ] channels.is_pending_ext_shared ...
抽出したいデータの選択は、下記のコマンドで行います。
# 1つだけ選ぶ場合 > meltano select tap-slack channels channel_id # カテゴリ下全てを選ぶ場合 > meltano select tap-slack messages "*" # 逆に除外する場合 > meltano select tap-slack --exclude channels.channel_id
選んだデータの確認。
> meltano select tap-slack --list
loaderの設定を行う
抽出(E)の設定を終えたら、今度はロード(L)の設定を行います。基本的な流れはextractorと一緒です。
loaderのインストール
まず、どのようなloaderがあるのか確認します。主要なDWHやDBは網羅されている印象です。csvファイルとして出力することもできます。
> meltano discover loaders Loaders target-bigquery target-csv, variants: hotgluexyz (default), singer-io target-jsonl target-postgres, variants: datamill-co (default), transferwise, meltano target-snowflake, variants: datamill-co (default), transferwise, meltano target-sqlite target-redshift
今回はBigQueryにロードしたいので、下記コマンドでBQ用のloaderをインストールします。
> meltano add loader target-bigquery
loaderの設定値のセット
extractorと同様、設定値をセットしてきます。ドキュメントは下記。
ドキュメントによれば、必須設定値は(BQの)Project ID
とCredentials Path
です。それぞれset
コマンドでセットしてきます。Credentials Pathというのは、BQに対してアクセスできるサービスアカウントのclient_secretsファイルをフルパスで指定します。
> meltano config target-bigquery set credentials_path <ファイルのフルパス>
参考までに、必須じゃない設定として、ロケーションも設定しました(未設定時の値はUS)
> meltano config target-bigquery set location asia-northeast1
EL(T)ジョブの実行
抽出(E)とロード(L)の準備ができたので、いよいよ実際にジョブを実行します(今回はTは未設定)。
meltano.ymlの確認
実は、今までコマンドで設定してきた値などは、全てmeltano.yml
に書き込まれています。これ、実は、直接編集しても反映されるため、慣れてきたら、いちいちコマンドを打たなくても、直接ファイルをダーッと書いて設定することも可能です。
今まで設定した私のymlファイルは下記のようになりました。
version: 1 send_anonymous_usage_stats: true project_id: 3a916aba-7b90-4dbe-b708-13ed3fc81d15 plugins: extractors: - name: tap-slack pip_url: git+https://github.com/Mashey/tap-slack.git config: channels: - <チャネルID> start_date: '2020-10-01T00:00:00Z' token: <トークン> # 今回ロードするSlackのデータ select: - messages.* - users.* - user_groups.* - threads.* # 下記のデータだけなぜかエラーになるため除外 - '!messages.last_read' loaders: # 一応CSV出力もできるように入れました - name: target-csv variant: hotgluexyz pip_url: git+https://github.com/hotgluexyz/target-csv.git@0.3.3 config: destination_path: <csvを出力するパス> # こっちがBQ - name: target-bigquery variant: adswerve pip_url: git+https://github.com/adswerve/target-bigquery.git@v0.10.2 config: project_id: tamai-rei location: asia-northeast1 credentials_path: <ファイルのパス>
上記ファイル、現在のところ未解決の課題があります。それは、SlackのAPIトークンがベタ書きされているということです。これはただの検証だからいいのですが、本番運用を想定して、Gitでバージョン管理しようとしたとき、当然このymlファイルは管理の対象になるため、このままあげちゃうのは非常によろしくありません。ドキュメントには、コマンドで設定すれば、「Git管理外のファイルにひっそり設定される」という旨が書かれてあったのですが、その状態だと「トークンがない」というエラーで動作しませんでした。現状、ymlファイルに直接書かないと動作できておらず、ここは後日しっかり確認したいと思います。
ジョブの実行
設定が問題ないことを確認したら、あとはジョブを実行するだけです。
elt
というコマンドで実行できます。その際、extractorとloaderを指定します。複数回実行するとき、job_id
が同じ場合、インクリメンタルにデータを同期してくれるようです。あとログも、このID別に記録されます。
> meltano elt tap-slack target-bigquery --job_id=slack-to-bq meltano | Running extract & load... meltano | Found state from 2021-09-22 03:16:32.538159. tap-slack | INFO Starting Sync.. (中略) meltano | Incremental state has been updated at 2021-09-22 03:18:08.793729. meltano | Extract & load complete! meltano | Transformation skipped. meltano | INFO Starting Discovery..INFO Finished Discovery..⏎
データを確認してみる
ジョブが終わったので、BQを見てみます。
データセット名は指定しなかったので(loader側で設定可能)、extractorの名前がそのまま使われています。その下には、extractorで指定したデータがロードされています。
中身はこんな感じ。
行の感じが怪しい…。
まさかの半構造型のデータがありました(RECORD型のREPEATED)。なので、実はこれをCSVに出力すると、ガタガタなデータで出てきてしまいます。他のDWHだとどうなるのやら。
実際の書き込みテキストももちろんあります。最後は他人のブログを嘲笑しているように見えますが、これは自分のブログを自嘲しています(ヒマな方はURLを直打ちして確認してみてください)。
集計してみる
せっかくなので、自分のチャネルで一番多く使われたリアクション(絵文字)を集計してみます。
WITH unnest_reaction AS( SELECT channel_id, ts, reactions.name AS name, reactions.COUNT AS COUNT FROM `tamai-rei.tap_slack.messages`, UNNEST(reactions) AS reactions ) SELECT name, SUM(COUNT) AS rcount FROM unnest_reaction GROUP BY name ORDER BY rcount DESC
結果は:kusa:
が一番多かったです。これは他のチャネルでも見られる傾向なので、私の分報が嘲笑われているわけではありません、たぶん。
おわりに
今度がdbtを連携して「T」(Transform)まで一緒にやってみたいです。